from influxdb import DataFrameClient

from bqtool.tgsdata.kafka_kit import Kafka_consumer
import time

from bqtool.utils.tools import to_pdtime_ms, to_pdtime_str

client = DataFrameClient('10.2.111.65',database='mydb')
consumer = Kafka_consumer(['10.150.27.143:9092'],'passcar', groupid='xinxiang2')
try:
    data = consumer.read_df()
    count = 0
    print('开始')
    for message in data:
        message.loc[:, 'PASSTIME'] = to_pdtime_str(message['PASSTIME'], tz=False)
        message = message.set_index('PASSTIME')
        message.drop(message.columns[0], axis=1, inplace=True)
        message = message.fillna('')
        client.write_points(message, 'mydb',tag_columns=["CARPLATE"], protocol='json')
        # print(f'保存成功{len(message)}')
    print(f'计数{count}')
except Exception as e:
    print(e)
print('end')